Audience: Diverse Background
Time: 2-day workshop (12 hours)
Pre-Requisites: Exposure to Python Programming
Brief Description: Spark is a Distributed Computing Framework used for processing, querying and analysing Big Data. It is open source and it can run tasks faster than traditional software. It is written in the programming language Scala, but it also has support for other widely used programming languages; Java, Python, R and SQL. In this course, we learn Spark using Python (pySpark).
Aims, Objectives and Intended Learning Outcomes: Before attending this course you were given instructions on how to register, login and start a Cluster and a Notebook on https://databricks.com. Before starting this course you should have already done all these.
By the end of Chapter 1 you should understand and be able to explain:
What is Big Data and its characteristics
What is Distributed Computing Framework (DCF)
What is Spark, its relation to the DCF, what it can do, its advantages and disadvantages
How Spark runs on a Cluster
The optimal way to handle partitioning
What a SparkSession is
By the end of Chapter 2, you should understand and be able to explain:
You should be able to:
By the end of Chapter 3, you should be able to investigate data by displaying:
The data
The data structure
The column names and
Selected parts of the data
By the end of Chapter 4, you should understand what the Join of DataFrames is, and be able to explain the difference between the different joins. You should be able to Join DataFrames together with a:
Left Join
Right Join
Inner Join and
Outer Join.
You also be able to union DataFrames.
By the end of Chapter 5, you should be able to perform Data querying and manipulation. You should be able to:
Remove, Add, Rename a column
Filter on a condition of a column and on multiple columns
Replace values dependent on conditions
Check for nulls
Drop Duplicates
Aggregate Data
Do Summary Statistics
By the end of Chapter 6, you should be able to:
Use .repartition() and .coalesce() to obtain the optimal partitioning for your DataFrame and the cores you have available. You need to understand what the optimal partitioning is.
Save data in a json or csv format.
By the end of Chapter 7, you should be able to:
Take a sample from the Data
Convert Spark DataFrames into pandas DataFrame
Make a simple scatterplot in Python.
By the end of Chapter 8, you should understand:
By the end of Chapter 9 you should:
Feel confident to deal with Data using pyspark.
Be able to combine what we learnt in the previous Chapters to investigate and manipulate Data.
Be able to investigate the current partitioning your DataFrame, calculate the optimal partition based on your DataFrame and your resources and make the necessary changes to achieve that.
Datasets: department_budget.csv, Travel_to_Work_Areas.csv, Travel_to_Work_Small_Areas.csv, Item_Price_Data.csv, Item_Defect_Dataset.csv, Item_Defect_Dataset_Modified.csv
Libraries: pySpark
Images Apart from a few bespoke created images, most of the images used in this course are taken from https://databricks.com/. The ones that were taken from other sources else they have a caption under them detailing their source.
Acknowledgements: We would like to thank the people that took the time to review the course material; Arturas Eidukas and Isabela Breton, and those who piloted the course for us and provided exceptionally useful feedback; Sonia Williams, Sonia Mazzi, Matt Wenham, Chris Bonham, and Raul Garcia.
Intended Learning Outcomes: Before attending this course you were given instructions on how to register, login and start a Cluster and a Notebook on https://databricks.com. Before starting this course you should have already done all these.
By the end of Chapter 1 you should understand and be able to explain:
What is Big Data and its characteristics
What is Distributed Computing Framework (DCF)
What is Spark, its relation to the DCF, what it can do, its advantages and disadvantages
How Spark runs on a Cluster
The optimal way to handle partitioning
What a SparkSession is
The term Big Data refers to data sets that are too large and complex for traditional software to handle. Nowadays, such data sets are everywhere and contain information that we want to reveal. Big Data are characterised by high volume, velocity, variety and veracity (how much noise the dataset has).
Challenge: Specific technology is required to capture, store, analyse, visualise etc this kind of data set.
Questions:
What is Big Data?
Name and explain the four characteristics of the Big Data
Distributed Computing is a system where multiple machines are working and communicating simultaneously with each other. It is used when there is a need for fast processing of a huge amount of data. The data is split into discrete, non-overlapping sections, with each machine running operations on one section of data. The cluster as a whole then reports up to the driver any results from the operations.
Famous Example of how DCF works:
I am in a library and I want to count the exact number of books in an hour. To achieve that, I call as many people as possible and divide the areas among them in a non-overlapping system. I ask them to be back right before the hour. Once they are back, I simply add the numbers to calculate the exact number of books in the library.
Apache Hadoop and Apache Spark are designed to do exactly that. They are designed to make resource allocation and result collecting a straightforward process.
Questions:
Explain with your own words what a DCF is.
Give an example of a DCF, like the one mentioned above.
Spark is a relatively new DCF used for processing, querying and analysing Big Data. It is open source and it can run tasks faster than previous DCFs by using in-memory computation. It is written in the programming language Scala, but it also has support for other widely used programming languages; Java, Python, R and SQL, and is used as an interface to control entire clusters of computers.
Question:
Spark is inherently scalable, and can handle vast amounts of data beyond the reach of traditional software and data analysis methods. For datasets too large to be held in normal computer memory, operations can be run over the entire data at once.
Spark’s key advantage is the ability to perform in memory computation. The entire data set can be held across the distributed cluster, operations and transformations can be applied, without having to write to a hard drive. Previous DCFs such as Hadoop had to read and write to the file system with each transformation, adding a huge time overhead (especially when the data is very large.)
Spark also has Lazy Evaluation - it will only evaluate commands when it is needed, and will identify the optimal, most efficient, way to perform a series of commands. For example, spark can be tasked to perform many transformations to manipulate the data, but until a command requiring a direct response - such as a count of the data - is issued, the manipulations will be queued up and optimised.
Advantages:
Fast processing of Big Data compared with other DCFs
Capable of handling large data sets where traditional software fails
Supports multiple languages (Scala, Java, R, Python)
Lazy Evaluation: data is evaluated only when an action is called for computation.
Disadvantages and Limitations:
It does not have its own file management system (relying on Hadoop or other platforms). When it is used with Hadoop, it supports a limited number of large files, thus ending up with a lot of small files.
Some failures are so vague that it is not easy to understand what they mean and where they are coming from.
There are not as many ready-made algorithms as using traditional languages (e.g. Python).
Python and R users are always a step behind in updates compared to Scala and Java users - features can be slow to be implemented.
Question:
Data engineers, data scientists, application developers.
Companies such as TripAdvisor, Yahoo!, eBay, MyFitnessPal, and the ONS.
A Cluster is the group of computers (nodes) which are all connected and coordinated together to perform tasks. Your own laptop could be part of this group of computers, or the one controlling them.
The Driver Process runs the main program, and queues tasks for the cluster to complete. Any results get collected and brought back to the driver.
Spark uses a Cluster Manager to coordinate the work across such a cluster for you, it directs the workers of the cluster as to which of the tasks set by the Driver they should be doing, .
The Executors, or workers, do the assigned tasks (executing code) and report back to the Driver.
Image taken from: https://annefou.github.io.pyspark/03-pyspark_context/ by Anne Fouilloux.
Note:
Proper Partitioning: In the diagram there are some cores that are not used because each worker has more cores than partitions and thus, we are wasting resources. We need to think about proper partitioning to optimize those resources.
The importance of partitioning cannot be understated. Suppose that you have access to 8 cores. When thinking about partitioning, the primary aim should be to have our data equally distributed between the cores.
If we have 8 cores, and 1 million records, it stands to reason that we want 125,000 records on each core. Unbalanced partitioning can lead to massive slow done. We want the number of partitions to be a multiple of the number of cores, to ensure that every core is being used equally.
In the first diagram below, if we assume that each partition takes 10 seconds for a task to happen, the total time will be 20 seconds - 10 seconds for the first 8 partitions, and 10 seconds for the last 2. During the last 2, 6 cores are sitting idle.
In the second diagram, with half as many partitions, each twice as large as before. The processing will take 20 seconds, but 3 cores will sit idle for the entire time.
In the diagram below, with the data equally split into 8 partitions, each partition will take 12.5 seconds to process. As each core has only 1 partition, the overall processing time will be 12.5 seconds, with no idle cores.
Questions:
Try to explain how Spark runs on a Cluster.
Try to explain how we should handle partitioning.
Spark can be run locally, using just the resources of your laptop/computer, or on a cluster. To run Spark on a local machine you need to install Java. If you want to use the Python API, you will need a Python interpreter and if you want to use R, then you need R on your machine. This can be done without any distributed storage system, and is useful for prototyping, development, and debugging.
To harness the full potential of a fully operational cluster, there are two main providers of cloud based Spark facilities, Databricks, and Cloudera. Databricks has a web-based interface, that is available for free in the Databricks Community Edition https://databricks.com/try-databricks, which we will be using today.
Within the ONS, the Data Access Platform runs an internally hosted Cloudera Data Science Workbench (CDSW) environent, which allows user definable numbers of clusters, memory allocation, and access to the secure data stored within HUE. HUE, or Hadoop User Experience, is a web interface which gives access to SQL databases, tables, and other data files hosted internally within the ONS. Within CDSW, users can write Python, R, or Scala code to interact with clusters.
The first thing a Spark program must do is to create a SparkSession object, which tells Spark how to access a cluster. A simple SparkSession is created below, but can be tailored Chapter 8).
Within DataBricks, a SparkSession is automatically created with the name spark, ready for use, however within the ONS Data Service , you will need to create your own.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
You can find pySpark Documentation at https://spark.apache.org/docs/latest/api/python/index.html
Question:
Now, you should understand and be able to explain:
What is Big Data and its characteristics
What is Distributed Computing Framework (DCF)
What is Spark, its relation to the DCF, what it can do, its advantages and disadvantages
How Spark runs on a Cluster
The optimal way to handle partitioning
What a SparkSession is
Intended Learning Outcomes:
By the end of this chapter, you should understand and be able to explain:
You should be able to:
A DataFrame is a collection of data, organised into named columns - with variables in columns and observations in rows. They are similiar to dataframes in R and Python (from the Pandas package), and are built on a foundation of RDDs - as such they share many characteristics, such as immutability. Fundamentally, DataFrames are an expansion of RDDs - they are more efficient, have a large amount of convenient built-in functions, and handle data in a much more structured manner, which is far easier for users to understand intuitively.
DataFrames contain rows of data, with a well defined schema which illustrates the structure of the data - it’s column names and data types.
More information can be found at the links below: https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/
Note:
Structured Data - information which can be stored in an SQL database in a table with rows and columns and are co-related.
Semi-Structured Data - information that is not in a relational database but it is organised in a way that make it easier to analyze eg csv, json, xml files.
Unstructured Data - everything else such as videos, photos, audio files.
DataFrames can handle structured data, overcoming one of RDDs key limitations.
DataFrames also have APIs available in Python, R, Java, and Scala, and are generally more accessible than RDDs for those new to spark. They however, don’t offer the low-level functionality and control of RDDs.
DataFrames also support different data formats (csv, parquet, Cassandra) and storage systems (Hive tables, mysql etc.).
As with the rest of spark, there are two types of operations which can be performed on a DataFrame, transformations, and actions. Any method which takes a DataFrame, manipulates it in some way and returns a new version, is a transformation, while any method which returns something from within the DataFrame (e.g. a count of records, a print of the first few rows) is an action. Due to Spark’s lazy evaluation, transformations are queued up until an action is called.
As such, running a code with just transformations will appear to happen instantly, while then running a .show() method to look at the results will take longer - the actual processing occurs at the action call.
From the Python API, DataFrames have much greater performance than RDDs, due to the inclusion of a built in optimiser. As DataFrames have transformations and actions, transformations will be added to sparks in-built optimiser until an action is called. At this point, the optimiser will determine the most efficient way of completing the required transformations.
Question
Advantages
There is no difference in performance between Scala and Python.
Massive performance benefits compared with RDDs.
Much more accessible than RDDs - far simpler to operate and use efficiently.
Drawbacks and Limitations:
Once we have transformed an RDD into a DataFrame we cannot recover the original RDD.
Sometimes have to resort to accessing the underlying RDD for increased capability.
A comprehensive comparison of RDDs and DataFrames can be found here: https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/
Question
A SparkSession is needed to create DataFrames, export DataFrames as tables, and execute SQL over tables. Within Databricks, one is created automatically with the name spark. Within the ONS Data Service, you’ll need to create one manually.
In the last Chapter we will learn how to modify the arguments for the SparkSession.
To create a DataFrame from raw data, define a list of column names, and a list of data, and pass them both to spark.createDataFrame(data, columns). As we can see below, the data should be provided on a row-by-row basis, rather than on a column-by-column basis.
columns = ['id', 'height(cm)', 'weight(kg)']
values = [('A', 160, 60), ('B', 180, 70), ('D', 150, 80)]
DataFrame_from_Scratch = spark.createDataFrame(values, columns)
DataFrame_from_Scratch.show()
## +---+----------+----------+
## | id|height(cm)|weight(kg)|
## +---+----------+----------+
## | A| 160| 60|
## | B| 180| 70|
## | D| 150| 80|
## +---+----------+----------+
Create a dataframe with column ‘department’, containing values ‘Legal’,‘Applied Science’, ‘Head Office’, and ‘Methodology’, and a column ‘employees’, with values: 11, 49, 5, and 27. The dataframe should have column names of ‘department’ and ‘employees’. Name the dataframe something clear and memorable e.g. ‘department_employees’.
Display the contents of the dataframe by using .show() Ensure that the column names and department names are exactly as specified above as they will used in later exercises.
columns = ['department', 'employees']
data = [('Legal', 11),
('Applied Science', 49),
('Head Office', 5),
('Methodology', 27)]
department_employees = spark.createDataFrame(data, columns)
department_employees.show()
## +---------------+---------+
## | department|employees|
## +---------------+---------+
## | Legal| 11|
## |Applied Science| 49|
## | Head Office| 5|
## | Methodology| 27|
## +---------------+---------+
Guidance
department and employees.show() that new variable. Never do something like the following:problem_dataframe = spark.createDataFrame(data, columns).show()
problem_dataframe will not be a DataFrame - it will be a Nonetype object as it is set equal to the value that .show() returns, and .show() returns nothing!spark.read.csv(path, header) will read in a CSV file, assign the data to a DataFrame, and if header is set, set the column names to the first row of the file. There are a huge amount of other variables which can be set, including sep to change the separator from a comma, and inferSchema if spark should attempt to infer datatypes automatically from the data.
Item_Price_DataFrame = spark.read.csv('./data/Item_Price_Data.csv', header=True)
Item_Price_DataFrame.show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
Within Databricks, the filepath needed is 'FileStore/tables/filename.csv'.
Within the ONS Data Service, filepaths should start hdfs://prod1/ and from then have the path from within HUE - the Hadoop User Experience file storage solution - appended. E.G. for the ONS BRES dataset, use hdfs://prod1/dapsen/landing_zone/ons/bres22. Full details can be found at http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv
spark.read.json(path) will read the json file found at path, and assign it to a DataFrame. As with read.csv, there are many different additional variables that can be passed to the function.
Json_File_Item_Price = spark.read.json("./data/Json_File_Item_Price.json")
Json_File_Item_Price.show()
## +----+-----------+---------------+-----+
## |Code| Item|Last_Year_Price|Price|
## +----+-----------+---------------+-----+
## | 22|Black Chair| 70| 100|
## | 3|White Table| 350| 500|
## | 16| Floor Lamp| 50| 60|
## | 12| Couch| 900| 1000|
## | 3|White Table| 499| 500|
## +----+-----------+---------------+-----+
SQL commands can be used to read from tables in databases. The below example will accept any valid SQL expression.
Within the ONS Data Service, most data will be stored in SQL tables, so this will probably be the most common import method you come across. Importantly, SQL tables include metadata such as column datatypes - which aren’t explicitly defined in CSV files.
SQL_Item_Price_example = spark.sql('SELECT * FROM database_name.table_name')
Import the table ‘department_budget’ into a dataframe using the sql example above. This contains department names, divisions, and budget data - name it something suitable e.g. ‘department_budget’.
Display some of the contents of the dataframe, to understand the structure of the data.
department_budget = spark.sql('SELECT * FROM default.department_budget')
department_budget.show()
## +--------------------+-----------+------+
## | department| division|budget|
## +--------------------+-----------+------+
## | HR| Admin| 7852|
## | Finance| Admin| 8541|
## | Legal| Admin| 9656|
## | International| Admin| 1913|
## | IT Services| Technical| 7420|
## | Procurement| Admin| 744|
## | R&D| Scientific| 8389|
## |Software Engineering|Engineering| 6109|
## | Systems Engineering|Engineering| 6564|
## | Legacy| Technical| 2581|
## | Space| Scientific| 7505|
## | Transit|Engineering| 7069|
## | Autonomy|Engineering| 4060|
## | Air|Engineering| 8712|
## | Maritime|Engineering| 8480|
## | Customer Support| Admin| 8293|
## | Training| Technical| 5241|
## | Brexit| Admin| 8886|
## | Planning| Admin| 5484|
## | Executive| Admin| 6967|
## +--------------------+-----------+------+
## only showing top 20 rows
To read in the file from a csv, you could do the following instead:
department_budget = spark.read.csv('./data/department_budget.csv', header = True, inferSchema = True)
where inferSchema = True lets Pyspark work out the data types of the columns, and header= True takes the first row and uses it as column names.
Guidance
default and the table name will be whatever you called it when you imported it - which will by default be department_budget_csv. By going to the data tab on the left hand side of databricks, you can see your table names, and by mousing over the desired table the entire name should appear._c0, _c1, you might have incorrectly uploaded the data. Delete the previous table of data by going to the data tab, finding the desired table, pressing the down-arrow to the right of the name and selecting Delete. Then, follow the onboarding instructions to reupload the data, making sure to tick the required check boxes on the left hand side!Now, you should understand and be able to explain:
You should be able to:
Intended Learning Outcomes:
By the end of this Chapter, you should be able to investigate data by displaying:
The data
The data structure
The column names and
Selected parts of the data
.show(x) will display in a visual format the first x elements of the DataFrame - up to 20 if x is not set.
Item_Price_DataFrame.show(6)
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
If a cell in a column has more than 20 characters, the end will be truncated. This can often lose the exponent value of a number with a lot of decimals, but can be avoided by passing a secondary argument into the function - .show(6, False) where the boolean refers to truncating or not.
data = [(1,1234567891234567891234510.9786)]
columns = ['idx', 'high_precision_data']
hpd = spark.createDataFrame(data, columns)
hpd.show()
## +---+--------------------+
## |idx| high_precision_data|
## +---+--------------------+
## | 1|1.234567891234567...|
## +---+--------------------+
hpd.show(1, False)
## +---+---------------------+
## |idx|high_precision_data |
## +---+---------------------+
## |1 |1.2345678912345678E24|
## +---+---------------------+
department_budget.show(4)
## +-------------+--------+------+
## | department|division|budget|
## +-------------+--------+------+
## | HR| Admin| 7852|
## | Finance| Admin| 8541|
## | Legal| Admin| 9656|
## |International| Admin| 1913|
## +-------------+--------+------+
## only showing top 4 rows
Guidance
department_budget with the name of your budget DataFrame. Make sure to give your DataFrames sensible names to make referring back to them easier!only showing top 4 rows - this means that there are more than 4 rows in the DataFrame, but not ohw many there are in total!.collect() will return a list of all rows of a DataFrame - avoid if the size of the data is larger than the size of the driver. .take(n) can be used to return only n rows of the DataFrame.
print(Item_Price_DataFrame.collect())
## [Row(Item='Black Chair', Code='22', Price='100', Last_Year_Price='70', Quantity='10'), Row(Item='White Table', Code='3', Price='500', Last_Year_Price='350', Quantity='50'), Row(Item='Floor Lamp', Code='16', Price='60', Last_Year_Price='50', Quantity='1'), Row(Item='White Table', Code='3', Price='500', Last_Year_Price='499', Quantity='20'), Row(Item='Couch', Code='12', Price='1000', Last_Year_Price='900', Quantity='5'), Row(Item='White Table', Code='3', Price='500', Last_Year_Price='499', Quantity='20')]
Run .collect() on your department - employees DataFrame. Try and to understand the results.
In a new cell, run .take(n) on your department - employees DataFrame, and replace n to only return the first record.
print(department_employees.collect())
## [Row(department='Legal', employees=11), Row(department='Applied Science', employees=49), Row(department='Head Office', employees=5), Row(department='Methodology', employees=27)]
print(department_employees.take(1))
## [Row(department='Legal', employees=11)]
Guidance
Row objects, where each one is equal to a row of the DataFrame..take() just returns the very first row..collect() if you have a very large DataFrame! It will try to pull all of the data into the driver of your system, which could easily cause the driver to crash!.printSchema() can be used to print the schema for a DataFrame in a visual tree. A DataFrame’s schema consists of column names, datatypes, and any other key rules for columns.
Item_Price_DataFrame.printSchema()
## root
## |-- Item: string (nullable = true)
## |-- Code: string (nullable = true)
## |-- Price: string (nullable = true)
## |-- Last_Year_Price: string (nullable = true)
## |-- Quantity: string (nullable = true)
.dtypes can also be used, to print out a list of tuples of 'Column name', 'datatype'.
print(Item_Price_DataFrame.dtypes)
## [('Item', 'string'), ('Code', 'string'), ('Price', 'string'), ('Last_Year_Price', 'string'), ('Quantity', 'string')]
Datatypes can be found by doing either of the comands above:
department_employees.printSchema()
## root
## |-- department: string (nullable = true)
## |-- employees: long (nullable = true)
print(department_employees.dtypes)
## [('department', 'string'), ('employees', 'bigint')]
Guidance
printSchema says that the employees column is a long, while dtypes claims it is a bigint. Fundamentally, these are the same thing! They represent an integer, between -9223372036854775808 and 9223372036854775807!.columns just returns a list of column names.
print(Item_Price_DataFrame.columns)
## ['Item', 'Code', 'Price', 'Last_Year_Price', 'Quantity']
columns = department_employees.columns
print(columns)
## ['department', 'employees']
.select() can be used to select parts of the data - either a single column, or multiple columns. It takes a column name, multiple column names, or a list object containing column names.
Select a single column.
Item_Price_DataFrame.select("Item").show()
## +-----------+
## | Item|
## +-----------+
## |Black Chair|
## |White Table|
## | Floor Lamp|
## |White Table|
## | Couch|
## |White Table|
## +-----------+
Select two columns.
Item_Price_DataFrame.select("Item","Price").show()
## +-----------+-----+
## | Item|Price|
## +-----------+-----+
## |Black Chair| 100|
## |White Table| 500|
## | Floor Lamp| 60|
## |White Table| 500|
## | Couch| 1000|
## |White Table| 500|
## +-----------+-----+
department_employees.select('department').show()
## +---------------+
## | department|
## +---------------+
## | Legal|
## |Applied Science|
## | Head Office|
## | Methodology|
## +---------------+
department_employees.select(['department']).show()
## +---------------+
## | department|
## +---------------+
## | Legal|
## |Applied Science|
## | Head Office|
## | Methodology|
## +---------------+
department_employees.select(columns[0]).show()
## +---------------+
## | department|
## +---------------+
## | Legal|
## |Applied Science|
## | Head Office|
## | Methodology|
## +---------------+
Guidance
.count() can be used to count the number of records in a DataFrame. When running .count(), it is advisable to set the output to a variable so it can be accessed in the future without having to rerun the action.
data_count = Item_Price_DataFrame.count()
print(data_count)
## 6
budget_count = department_budget.count()
print('budget: ', budget_count)
## budget: 33
employees_count = department_employees.count()
print('employees: ', employees_count)
## employees: 4
Guidance
Now, you should be able to investigate data by displaying:
The data
The data structure
The column names and
Selected parts of the data
Intended Learning Outcomes:
By the end of this Chapter, you should understand what the Join of DataFrames is, and be able to explain the difference between the different joins. You should be able to Join DataFrames together with a:
Left Join
Right Join
Inner Join and
Outer Join.
You also be able to union DataFrames.
Joining DataFrames works the same as joining SQL tables.
Inner Join
Outer Join
Left Join
Right Join
Pictures taken from wikipedia.org
There are many different ways to join DataFrames. The main types are Inner, Left, Right, and Outer.
Lets read in another set of data to join with our previous Item_Prices_Dataset.
Item_Defect_Data = spark.read.csv('./data/Item_Defect_Dataset.csv', header=True)
Item_Defect_Data.show()
## +-----------+----+---------+
## | Item|Code|Defective|
## +-----------+----+---------+
## |Black Chair| 22| 4|
## |White Table| 3| 15|
## | Couch| 12| 3|
## | Red Door| 14| 41|
## +-----------+----+---------+
Left Joins will keep every record in the first DataFrame, and any matching data in the second DataFrame.
dataFrame.join(dataFrame2, on, how) will join dataFrame2 to dataFrame, by the method specified by how, and on the columns specified by on.
.join() will default to an inner join if how is not specified. on specifies the column or columns to join on, and can be expanded extensively if for example the DataFrames have different column names.
Item_left_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='left') # Could also use 'left_outer'
Item_left_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair| 22| 100| 70| 10| 22| 4|
## |White Table| 3| 500| 350| 50| 3| 15|
## | Floor Lamp| 16| 60| 50| 1|null| null|
## |White Table| 3| 500| 499| 20| 3| 15|
## | Couch| 12| 1000| 900| 5| 12| 3|
## |White Table| 3| 500| 499| 20| 3| 15|
## +-----------+----+-----+---------------+--------+----+---------+
You would like to join your department DataFrames together, keeping only the data for which you have employee values. Will a left join be sufficient? What column should you join on?
Create a new DataFrame for the join, name it something relevent, and show the results.
Note
This is the DataFrame you will be working from for the rest of this course. Don’t overwrite it with any different types of joins.
A Left join will be enough to keep only data where you have employee values (the entirety of department_employees). However, a Right join could also be used, depending on the order of the DataFrames.
department_data = department_employees.join(department_budget, on = 'department', how = 'left')
department_data.show()
## +---------------+---------+----------+------+
## | department|employees| division|budget|
## +---------------+---------+----------+------+
## | Legal| 11| Admin| 9656|
## |Applied Science| 49|Scientific| 4781|
## | Head Office| 5| null| null|
## | Methodology| 27| Technical| 4644|
## +---------------+---------+----------+------+
This second example holds the exact same data, just with a slightly different column order.
department_budget.join(department_employees, on = 'department', how = 'right')
## DataFrame[department: string, division: string, budget: int, employees: bigint]
As we have kept all of the data from department_employees, a few null values appear, as there is no Head Office record in the budget DataFrame! We will resolve these at a later stage.
Guidance
department_data DataFrame! You will need it at the beginning of the next section!If you are overwriting previous DataFrames, like I have above, be very careful never to write a line of code like the following:
department_data = department_data.show()
This will display the DataFrame happily (due to the .show()), but will set department_data equal to none. Any future operations on department_data will fail, and throw up an error saying something like: AttributeError: 'NoneType' object has no attribute 'show'. If this occurs, you might have to rerun a large amount of code - from when the DataFrame was first created!
Right Joins will keep every record in the second DataFrame, and any matching data in the first DataFrame.
Item_right_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='right') # Could also use 'right_outer'
Item_right_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair| 22| 100| 70| 10| 22| 4|
## |White Table| 3| 500| 499| 20| 3| 15|
## |White Table| 3| 500| 499| 20| 3| 15|
## |White Table| 3| 500| 350| 50| 3| 15|
## | Couch| 12| 1000| 900| 5| 12| 3|
## | Red Door|null| null| null| null| 14| 41|
## +-----------+----+-----+---------------+--------+----+---------+
right join. Keep your DataFrames in the same order as in the above question. How does this differ to your original joint DataFrame? Don’t overwrite your left-joined DataFrame from above.department_employees.join(department_budget, on = 'department', how = 'right').show()
## +--------------------+---------+-----------+------+
## | department|employees| division|budget|
## +--------------------+---------+-----------+------+
## | International| null| Admin| 1913|
## | Customer Support| null| Admin| 8293|
## | Maritime| null|Engineering| 8480|
## | Transit| null|Engineering| 7069|
## | Accounting| null| Admin| 3275|
## | HR| null| Admin| 7852|
## | IT Services| null| Technical| 7420|
## |Facilities Manage...| null| Admin| 114|
## | Legacy| null| Technical| 2581|
## | Finance| null| Admin| 8541|
## | Applied Science| 49| Scientific| 4781|
## | Executive| null| Admin| 6967|
## | Planning| null| Admin| 5484|
## | Production| null|Engineering| 4671|
## | Brexit| null| Admin| 8886|
## |Radio Communications| null| Scientific| 3370|
## | Graphics| null| Technical| 9724|
## | Autonomy| null|Engineering| 4060|
## | Mail| null| Admin| 4575|
## | Economic Statistics| null| Technical| 9629|
## +--------------------+---------+-----------+------+
## only showing top 20 rows
Inner Joins will return only records which can be joined in both DataFrames.
Item_inner_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='inner')
Item_inner_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair| 22| 100| 70| 10| 22| 4|
## |White Table| 3| 500| 350| 50| 3| 15|
## |White Table| 3| 500| 499| 20| 3| 15|
## | Couch| 12| 1000| 900| 5| 12| 3|
## |White Table| 3| 500| 499| 20| 3| 15|
## +-----------+----+-----+---------------+--------+----+---------+
inner join. How does this differ to your original DataFrame? Don’t overwrite your left-joined DataFrame from above.department_employees.join(department_budget, on = 'department', how = 'inner').show()
## +---------------+---------+----------+------+
## | department|employees| division|budget|
## +---------------+---------+----------+------+
## | Legal| 11| Admin| 9656|
## |Applied Science| 49|Scientific| 4781|
## | Methodology| 27| Technical| 4644|
## +---------------+---------+----------+------+
Outer Joins will keep all data from both DataFrames, and overlap them where possible.
Item_outer_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='outer')
Item_outer_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## | Red Door|null| null| null| null| 14| 41|
## |Black Chair| 22| 100| 70| 10| 22| 4|
## | Couch| 12| 1000| 900| 5| 12| 3|
## | Floor Lamp| 16| 60| 50| 1|null| null|
## |White Table| 3| 500| 350| 50| 3| 15|
## |White Table| 3| 500| 499| 20| 3| 15|
## |White Table| 3| 500| 499| 20| 3| 15|
## +-----------+----+-----+---------------+--------+----+---------+
outer join. How does this differ to your original DataFrame? Don’t overwrite your left-joined DataFrame from above.department_employees.join(department_budget, on = 'department', how = 'outer').show()
## +--------------------+---------+-----------+------+
## | department|employees| division|budget|
## +--------------------+---------+-----------+------+
## | International| null| Admin| 1913|
## | Customer Support| null| Admin| 8293|
## | Maritime| null|Engineering| 8480|
## | Transit| null|Engineering| 7069|
## | Accounting| null| Admin| 3275|
## | HR| null| Admin| 7852|
## | IT Services| null| Technical| 7420|
## |Facilities Manage...| null| Admin| 114|
## | Legacy| null| Technical| 2581|
## | Finance| null| Admin| 8541|
## | Head Office| 5| null| null|
## | Applied Science| 49| Scientific| 4781|
## | Executive| null| Admin| 6967|
## | Planning| null| Admin| 5484|
## | Production| null|Engineering| 4671|
## | Brexit| null| Admin| 8886|
## |Radio Communications| null| Scientific| 3370|
## | Graphics| null| Technical| 9724|
## | Autonomy| null|Engineering| 4060|
## | Mail| null| Admin| 4575|
## +--------------------+---------+-----------+------+
## only showing top 20 rows
While joining combines DataFrames horizontally - adding extra columns - .union() can be used to combine DataFrames vertically - adding extra records.
.union() requires each DataFrame having the same number of columns, but doesn’t check that the columns have the same name, datatype etc. It will just append the second DataFrame’s first column to the first column of the first DataFrame, for every column. As such, ensure that your data format is identical before performing a union - that your columns are in the same order in both DataFrames.
shuffled_Item_Price = Item_Price_DataFrame.select('Code','Price','Quantity','Item','Last_Year_Price')
Item_Price_DataFrame.union(shuffled_Item_Price).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## | 22| 100| 10| Black Chair| 70|
## | 3| 500| 50| White Table| 350|
## | 16| 60| 1| Floor Lamp| 50|
## | 3| 500| 20| White Table| 499|
## | 12|1000| 5| Couch| 900|
## | 3| 500| 20| White Table| 499|
## +-----------+----+-----+---------------+--------+
Instead of .union(), .unionByName() can be used. This will look through the column names, and pair each column with its matching column name. Unlike .union(), which doesn’t care about column names at all, .unionByName() requires each DataFrame to have the same column names, but the order can be different.
Item_Price_DataFrame.unionByName(shuffled_Item_Price).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
What would happen if you union your department-employees, and department-budget DataFrames? Perform the transformation, and see if the results match your expectations
Create a new DataFrame containing your department_employees DataFrame unioned to itself, and call it unioned_employees. Show it to observe the results.
You should get an error message when you union the different DataFrames together! But pyspark is helpful here and gives a useful error message. If you have used union, it will tell you that the number of columns in the first DataFrame is not equal to the number of columns in the second DataFrame. If you have tried unionByName, it will tell you that a specific column does not appear in both of the DataFrames!
unioned_employees = department_employees.union(department_employees)
unioned_employees.show()
## +---------------+---------+
## | department|employees|
## +---------------+---------+
## | Legal| 11|
## |Applied Science| 49|
## | Head Office| 5|
## | Methodology| 27|
## | Legal| 11|
## |Applied Science| 49|
## | Head Office| 5|
## | Methodology| 27|
## +---------------+---------+
OR:
unioned_employees = department_employees.unionByName(department_employees)
Guidance
union and unionByName both have their uses.union will work best.unionByName is better!All of the joins performed above involved joining two DataFrames on one column, which is present and has the same name in both the original DataFrames. This can be expanded to multiple columns, or columns without matching names.
Join on different named columns
We’ll import a new example set of data.
Item_Defects_Modified = spark.read.csv('./data/Item_Defect_Dataset_Modified.csv', header = True)
Item_Defects_Modified.show()
## +-----------+---+---------+
## |Description| ID|Defective|
## +-----------+---+---------+
## |Black Chair| 22| 4|
## |White Table| 3| 15|
## | Couch| 12| 3|
## | Red Door| 14| 41|
## |White Table| 4| 3|
## +-----------+---+---------+
To join this data to our previous Item_Price_DataFrame, we need to match up the original Item column, with the new Description column.
When we perform the join, we directly match up column names from specific DataFrames.
Combined_Item_Data = Item_Price_DataFrame.join(Item_Defects_Modified,
on = (Item_Price_DataFrame['Item'] == Item_Defects_Modified['Description']),
how = 'inner')
Combined_Item_Data.show()
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Description| ID|Defective|
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## |Black Chair| 22| 100| 70| 10|Black Chair| 22| 4|
## |White Table| 3| 500| 350| 50|White Table| 4| 3|
## |White Table| 3| 500| 350| 50|White Table| 3| 15|
## |White Table| 3| 500| 499| 20|White Table| 4| 3|
## |White Table| 3| 500| 499| 20|White Table| 3| 15|
## | Couch| 12| 1000| 900| 5| Couch| 12| 3|
## |White Table| 3| 500| 499| 20|White Table| 4| 3|
## |White Table| 3| 500| 499| 20|White Table| 3| 15|
## +-----------+----+-----+---------------+--------+-----------+---+---------+
This has the unfortunate effect of including both of the joining columns - in the example, Item and Description, are included in the results of the join, and these are going to be identical. Dropping one though is a trivial case of passing it to .drop(), as we will see in the next section.
There is a fundamental issue with the results above, due to the White Table records; data has been duplicated, as the join was performed on non-unique data. This can be fixed by joining on both Item/Description, and Code/ID. Multiple conditions should each be wrapped in a bracket, and the entire set of conditions wrapped in brackets.
Combined_Item_Data = Item_Price_DataFrame.join(Item_Defects_Modified,
on = ((Item_Price_DataFrame['Item'] == Item_Defects_Modified['Description']) & (Item_Price_DataFrame['Code'] == Item_Defects_Modified['ID'])),
how = 'inner')
Combined_Item_Data.show()
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Description| ID|Defective|
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## |Black Chair| 22| 100| 70| 10|Black Chair| 22| 4|
## |White Table| 3| 500| 350| 50|White Table| 3| 15|
## |White Table| 3| 500| 499| 20|White Table| 3| 15|
## | Couch| 12| 1000| 900| 5| Couch| 12| 3|
## |White Table| 3| 500| 499| 20|White Table| 3| 15|
## +-----------+----+-----+---------------+--------+-----------+---+---------+
As each Item/Code pairing is unique, the join occurs as expected. This is very important to watch out for, and can be checked by performing .count()s before and after, and checking the results seem sensible. Incorrect joins has led to in one case, a 2TB set of data being created and saved to a Database table. Unsuprisingly, writing, reading, and performing operations on this took an incredibly long time,
Linking data is a key part of Data Science, and this brief introduction to joins is just that - a brief introduction. There are a huge amount of resources online about joining data, and any techniques used to join SQL tables should translate to pyspark.
Now, you should understand what the Join of DataFrames is, and be able to explain the difference between the different joins. You should, also, be able to Join DataFrames together with a:
Left Join
Right Join
Inner Join and
Outer Join.
You also be able to union DataFrames.
Intended Learning Outcomes:
By the end of this Chapter, you should be able to perform Data querying and manipulation. You should be able to:
Remove, Add, Rename a column
Filter on a condition of a column and on multiple columns
Replace values dependent on conditions
Check for nulls
Drop Duplicates
Aggregate Data
Do Summary Statistics
When performing operations on DataFrames, there are usually equivalent SQL commands, and Python based functions. DataFrames can be queried using SQL syntax, or Python syntax, whichever is more convenient, and more familiar.
If you are performing transformations to a DataFrame, they will return a DataFrame object. It is generally advisable to save that object to a variable, and then on a new line perform an action (such as .count(), or .show()) on the DataFrame. This allows you to keep using the DataFrame for future work.
Columns can be added to DataFrames by passing a column name and a column object into .withColumn(name, col). Column objects can be created in different ways, but a few examples can be seen below.
Existing columns can have arithmetic applied to them by calling col(column_name) on the columns required.
from pyspark.sql.functions import col
Item_Price_DataFrame.withColumn('Price_Change', col('Price') - col('Last_Year_Price')).show()
## +-----------+----+-----+---------------+--------+------------+
## | Item|Code|Price|Last_Year_Price|Quantity|Price_Change|
## +-----------+----+-----+---------------+--------+------------+
## |Black Chair| 22| 100| 70| 10| 30.0|
## |White Table| 3| 500| 350| 50| 150.0|
## | Floor Lamp| 16| 60| 50| 1| 10.0|
## |White Table| 3| 500| 499| 20| 1.0|
## | Couch| 12| 1000| 900| 5| 100.0|
## |White Table| 3| 500| 499| 20| 1.0|
## +-----------+----+-----+---------------+--------+------------+
lit(value) can be used to add a column with the same value in each record.
from pyspark.sql.functions import lit
Item_Price_DataFrame.withColumn('Constant_Value', lit(4)).show()
## +-----------+----+-----+---------------+--------+--------------+
## | Item|Code|Price|Last_Year_Price|Quantity|Constant_Value|
## +-----------+----+-----+---------------+--------+--------------+
## |Black Chair| 22| 100| 70| 10| 4|
## |White Table| 3| 500| 350| 50| 4|
## | Floor Lamp| 16| 60| 50| 1| 4|
## |White Table| 3| 500| 499| 20| 4|
## | Couch| 12| 1000| 900| 5| 4|
## |White Table| 3| 500| 499| 20| 4|
## +-----------+----+-----+---------------+--------+--------------+
Create a new DataFrame from your joint DataFrame, containing the budget per head for each department.
Create a new DataFrame from your budget per head DataFrame, containing today’s date as a string.
from pyspark.sql.functions import col
department_data = department_data.withColumn('bph', col('budget') / col('employees'))
department_data.show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| bph|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Head Office| 5| null| null| null|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
from pyspark.sql.functions import lit
department_data = department_data.withColumn('date', lit('1901-01-01'))
department_data.show()
## +---------------+---------+----------+------+-----------------+----------+
## | department|employees| division|budget| bph| date|
## +---------------+---------+----------+------+-----------------+----------+
## | Legal| 11| Admin| 9656|877.8181818181819|1901-01-01|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|1901-01-01|
## | Head Office| 5| null| null| null|1901-01-01|
## | Methodology| 27| Technical| 4644| 172.0|1901-01-01|
## +---------------+---------+----------+------+-----------------+----------+
Recall the Item_Price_DataFrame
Item_Price_DataFrame.show(6)
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
To remove a column from a DataFrame, use .drop('colname').
Item_Price_DataFrame.drop("Last_Year_Price").show()
## +-----------+----+-----+--------+
## | Item|Code|Price|Quantity|
## +-----------+----+-----+--------+
## |Black Chair| 22| 100| 10|
## |White Table| 3| 500| 50|
## | Floor Lamp| 16| 60| 1|
## |White Table| 3| 500| 20|
## | Couch| 12| 1000| 5|
## |White Table| 3| 500| 20|
## +-----------+----+-----+--------+
.drop() can drop multiple columns, if they are entered as comma separated strings, and not as a list.
Item_Price_DataFrame.drop("Last_Year_Price", "Price").show()
#Item_Price_DataFrame.drop(["Last_Year_Price", "Price"]).show() # RAISES AN ERROR
## +-----------+----+--------+
## | Item|Code|Quantity|
## +-----------+----+--------+
## |Black Chair| 22| 10|
## |White Table| 3| 50|
## | Floor Lamp| 16| 1|
## |White Table| 3| 20|
## | Couch| 12| 5|
## |White Table| 3| 20|
## +-----------+----+--------+
However, Python’s List Unpacking method can be used - by passing in a list preceded with with an asterix *, the function will treat every item in the list as a separate parameter being passed in - so .drop(*['Price','Item']) == .drop('Price', 'Item')
Item_Price_DataFrame.drop(*["Last_Year_Price", "Price"]).show()
## +-----------+----+--------+
## | Item|Code|Quantity|
## +-----------+----+--------+
## |Black Chair| 22| 10|
## |White Table| 3| 50|
## | Floor Lamp| 16| 1|
## |White Table| 3| 20|
## | Couch| 12| 5|
## |White Table| 3| 20|
## +-----------+----+--------+
department_data = department_data.drop('date')
department_data.show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| bph|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Head Office| 5| null| null| null|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
A column can be renamed by passing a string of the old name and new name into .withColumnRenamed(old_name, new_name).
Item_Price_DataFrame.withColumnRenamed("Last_Year_Price","Old_Price").show()
## +-----------+----+-----+---------+--------+
## | Item|Code|Price|Old_Price|Quantity|
## +-----------+----+-----+---------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------+--------+
Bulk renames can be done by looping over the column names, and renaming each one.
budget_per_head. (Or something else, if yours is already called budget_per_head!)department_data = department_data.withColumnRenamed('bph', 'budget_per_head')
department_data.show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Head Office| 5| null| null| null|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
The order of records can be altered by using .orderBy(cols).
Item_Price_DataFrame.orderBy('Item').show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## | Couch| 12| 1000| 900| 5|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 350| 50|
## |White Table| 3| 500| 499| 20|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
The keyword argument ascending can be added to determine whether the data is ordered in ascending or descending order: .orderBy(cols, ascending = True)
Item_Price_DataFrame.orderBy('Item', ascending = False).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |White Table| 3| 500| 499| 20|
## |White Table| 3| 500| 350| 50|
## |White Table| 3| 500| 499| 20|
## | Floor Lamp| 16| 60| 50| 1|
## | Couch| 12| 1000| 900| 5|
## |Black Chair| 22| 100| 70| 10|
## +-----------+----+-----+---------------+--------+
department_data.orderBy('department').show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Head Office| 5| null| null| null|
## | Legal| 11| Admin| 9656|877.8181818181819|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
.filter(condition) will filter a DataFrame depending on the condition set. Example below will show prices that are above 600. .where can be used instead of .filter - they are aliases of one another. When creating the filter condition, it can be done with SQL, or using Python based column comparators.
Item_Price_DataFrame.filter(Item_Price_DataFrame['Price'] > 600).show() #Python based command
#Below will have the same effect.
#Item_Price_DataFrame.filter('Price > 600').show() #SQL based command
#Item_Price_DataFrame.filter(Item_Price_DataFrame.Price > 600).show() #Python based command
## +-----+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----+----+-----+---------------+--------+
## |Couch| 12| 1000| 900| 5|
## +-----+----+-----+---------------+--------+
Item_Price_DataFrame.filter(Item_Price_DataFrame.Price.between(40,120)).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## | Floor Lamp| 16| 60| 50| 1|
## +-----------+----+-----+---------------+--------+
department_data.where(department_data['budget_per_head'] > 100).show()
## +-----------+---------+---------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +-----------+---------+---------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Methodology| 27|Technical| 4644| 172.0|
## +-----------+---------+---------+------+-----------------+
department_data.filter('budget_per_head > 100').show()
## +-----------+---------+---------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +-----------+---------+---------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Methodology| 27|Technical| 4644| 172.0|
## +-----------+---------+---------+------+-----------------+
department_data.where(department_data.budget_per_head > 100).show()
## +-----------+---------+---------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +-----------+---------+---------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Methodology| 27|Technical| 4644| 172.0|
## +-----------+---------+---------+------+-----------------+
Guidance
dataframe.column_name.dataframe.column_name, make sure there are no spaces in the column name! This is good practice in general, never put spaces in column names!dtypes, becuase if you do department_data.where(department_data.dtypes > 10), it will throw an error as it tries to use the attribute rather than the column!Multiple conditions can be entered, via SQL or using Python. With SQL, all commands should be in one string, separated by and or or. If using Python style column comparison, each separate condition should be wrapped in brackets () and separated by & or |for and/or respectively.
Item_Price_DataFrame.filter((Item_Price_DataFrame['Price'] > 300) & (Item_Price_DataFrame['Code'] > 3)).show() #Python based command
#Below will have the same effect.
#Item_Price_DataFrame.filter('Price > 300 and Code > 3').show() #SQL based command
#Item_Price_DataFrame.filter((Item_Price_DataFrame.Price > 300) & (Item_Price_DataFrame.Code > 3)).show() #Python based command
## +-----+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----+----+-----+---------------+--------+
## |Couch| 12| 1000| 900| 5|
## +-----+----+-----+---------------+--------+
department_data.filter((department_data['budget_per_head'] > 100) & (department_data['employees'] < 15)).show()
## +----------+---------+--------+------+-----------------+
## |department|employees|division|budget| budget_per_head|
## +----------+---------+--------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## +----------+---------+--------+------+-----------------+
department_data.where('budget_per_head > 100 and employees < 15').show()
## +----------+---------+--------+------+-----------------+
## |department|employees|division|budget| budget_per_head|
## +----------+---------+--------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## +----------+---------+--------+------+-----------------+
Guidance
& to mean and and | to mean or. If using the SQL syntax, just use the words!.when(condition,value_if_true) and .otherwise(value_if_false) can be used together to conditionally change values in a DataFrame. If the data has a known error, it can be corrected easily as below, by overwriting the column with a new value if the condition is True, or use the original column value if the condition is False.
Conditions can be based on other columns in the DataFrame, in which the value for that record will be assessed. .when(condition, value_if_true) can be used without .otherwise(value_if_false), and will just fill the missing values with nulls.
from pyspark.sql.functions import when
Item_Price_DataFrame.withColumn('Price', when(Item_Price_DataFrame["Price"] == 100, 99.99).otherwise(Item_Price_DataFrame["Price"])).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22|99.99| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
A correction has come through for the Applied Science department budget, it should actually be 6781. Create a new DataFrame with the correction, and display it to check it has gone through.
The budget per head is now incorrect, so redo the previous exercises to Create a Dataframe with the correction, and check which departments now have a budget/head of more than 100.
from pyspark.sql.functions import when
department_data_corrected = department_data.withColumn('budget', when(department_data['department'] == 'Applied Science', 6781).otherwise(department_data['budget']))
department_data_corrected.show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 6781|97.57142857142857|
## | Head Office| 5| null| null| null|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
But this would also work for this specific example, as the Applied Science department is the only one with a previous budget of 4781:
department_data.withColumn('budget', when(department_data['budget'] == 4781, 6781).otherwise(department_data['budget']))
Once the corrected value is in, the budget per head can be recalculated:
department_data_corrected = department_data_corrected.withColumn('budget_per_head', col('budget') / col('employees'))
department_data_corrected.show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 6781|138.3877551020408|
## | Head Office| 5| null| null| null|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
Guidance
budget, is the column you want to make changes in. Usually, this will be the same as the one in the otherwise() section, as you want the default value to be keep the original data. In the conditional when() statement, use any column that you want to run the logic against!when() statements can be chained together -df.withColumn('name', when(condition_1, value_1).when(condition_2, value_2)...when(condition_x, value_x).otherwise(default_value))
Applied Science budget successfully, but what if there was an Applied Science department within a different division? That would also get its budget changed! Instead of querying on one column, query on as many as you can to be as specific as possible - you could always put the condition in a filter block first, to check that only the record you care about will be deleted!department_data.withColumn('budget', when(((department_data['department'] == 'Applied Science') & (department_data['budget'] == 4781) & (department_data['division'] == 'Scientific')),6781).otherwise(department_data['budget'])).show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 6781|97.57142857142857|
## | Head Office| 5| null| null| null|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
The idea of null in spark is similar to in R and Python; as the absence of data, it cannot be located with traditional conditions such as column == null. Instead, special methods must be used.
There is a simple command, .isNull() which will reveal any nulls.
To check if there are nulls in the Outer Joint DataFrame, the below can be used.
Item_outer_join.filter(Item_outer_join["Price"].isNull()).show()
#using the SQL style method
#Item_outer_join.filter('Price is null').show()
## +--------+----+-----+---------------+--------+----+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +--------+----+-----+---------------+--------+----+---------+
## |Red Door|null| null| null| null| 14| 41|
## +--------+----+-----+---------------+--------+----+---------+
Likewise, to find records where the value is not null, .isNotNull() can be used.
Item_outer_join.filter(Item_outer_join["Price"].isNotNull()).show()
#Item_outer_join.filter('Price is not null').show()
## +-----------+----+-----+---------------+--------+----+---------+
## | Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair| 22| 100| 70| 10| 22| 4|
## |White Table| 3| 500| 350| 50| 3| 15|
## | Floor Lamp| 16| 60| 50| 1|null| null|
## |White Table| 3| 500| 499| 20| 3| 15|
## | Couch| 12| 1000| 900| 5| 12| 3|
## |White Table| 3| 500| 499| 20| 3| 15|
## +-----------+----+-----+---------------+--------+----+---------+
Head Office had some missing data. Use .filter() and .isNotNull() to Create a new DataFrame containing only records where we have both employment, and budget, data.department_data_no_null = department_data.filter((department_data['budget'].isNotNull() & department_data['employees'].isNotNull()))
department_data_no_null.show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
Nulls can be handled in two main ways, they can be replaced, or they can be dropped. There are two ways in which they can be replaced. The first, using the method in 5.6, is to use other columns to impute missing data. The second involves blanket replacing nulls with other values using .fillna().
.fillna(value, subset = None) can be used to blanket replace nulls with a specified value, over all columns if subset = None, or over a set of columns if subset = [cols]. The value can either be a single value for all columns, or a dictionary of column_name : value pairs. If a subset of columns is used, be careful that the fill value is of the same data type as the subsetted columns, else it will be ignored. If a dictionary of column_name : value pairs is used, then subset is ignored.
Item_Price_Nulls = spark.read.csv('./data/Item_Price_Nulls.csv', header = True)
Item_Price_Nulls.show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| null|
## |White Table|null| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| null| 499| 20|
## | Couch| 12| null| 900| null|
## |White Table| 3| 500| 499| 20|
## | Sofa|null| null| null| null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.fillna('fill_value').show()
## +-----------+----------+----------+---------------+----------+
## | Item| Code| Price|Last_Year_Price| Quantity|
## +-----------+----------+----------+---------------+----------+
## |Black Chair| 22| 100| 70|fill_value|
## |White Table|fill_value| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3|fill_value| 499| 20|
## | Couch| 12|fill_value| 900|fill_value|
## |White Table| 3| 500| 499| 20|
## | Sofa|fill_value|fill_value| fill_value|fill_value|
## +-----------+----------+----------+---------------+----------+
Item_Price_Nulls.fillna(999, subset=['Code']).show()
#Doesn't work as column type of 'Code' is string, not int.
#Subset ignores when column type doesn't match.
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| null|
## |White Table|null| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| null| 499| 20|
## | Couch| 12| null| 900| null|
## |White Table| 3| 500| 499| 20|
## | Sofa|null| null| null| null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.fillna('999', subset=['Code']).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| null|
## |White Table| 999| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| null| 499| 20|
## | Couch| 12| null| 900| null|
## |White Table| 3| 500| 499| 20|
## | Sofa| 999| null| null| null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.fillna({'Code': 999,
'Price':000,
'Quantity':0}).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 0|
## |White Table| 999| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 0| 499| 20|
## | Couch| 12| 0| 900| 0|
## |White Table| 3| 500| 499| 20|
## | Sofa| 999| 0| null| 0|
## +-----------+----+-----+---------------+--------+
.fillna(value) is an alias of .na.fill(value). Either can be used depending on personal preference.
Head Office record, lets fill it. Create a new DataFrame with the nulls filled, with division set to Admin, budget set to 6262, and budget_per_head set to 1252.4.
fill_dictionary = {'division' : 'Admin', 'budget' : 6262, 'budget_per_head' : 1252.4}
department_data.fillna(fill_dictionary)
## DataFrame[department: string, employees: bigint, division: string, budget: int, budget_per_head: double]
Guidance
To drop nulls, the above method of column.isNotNull() can be used, to return only records with non-null values in a specific column. .dropna() can also be used, and offers more functionality.
.dropna(how='any', thresh=None, subset=None) can be used to completely drop records containing nulls. The parameter how can either drop rows with any nulls, or which are all null. If thresh is set to an integer, only records with less non-null values will be dropped, overriding the how parameter. As with fillna(), a subset of columns can be dealt with, excluding columns where null values aren’t an issue.
Item_Price_Nulls.show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| null|
## |White Table|null| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| null| 499| 20|
## | Couch| 12| null| 900| null|
## |White Table| 3| 500| 499| 20|
## | Sofa|null| null| null| null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(how='any').show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(how='all').show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| null|
## |White Table|null| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| null| 499| 20|
## | Couch| 12| null| 900| null|
## |White Table| 3| 500| 499| 20|
## | Sofa|null| null| null| null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(how='all', subset = Item_Price_Nulls.columns[1:]).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| null|
## |White Table|null| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| null| 499| 20|
## | Couch| 12| null| 900| null|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(thresh = 3).show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| null|
## |White Table|null| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| null| 499| 20|
## | Couch| 12| null| 900| null|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
As with .fillna(), the alias .na.drop(how) can be used instead of .dropna().
.dropna() to remove the Head Office record from the joined employment-budget DataFrame.department_data.dropna(how = 'any').show() #drop any record with any nulls
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
department_data.dropna('any').show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
department_data.dropna().show()
## +---------------+---------+----------+------+-----------------+
## | department|employees| division|budget| budget_per_head|
## +---------------+---------+----------+------+-----------------+
## | Legal| 11| Admin| 9656|877.8181818181819|
## |Applied Science| 49|Scientific| 4781|97.57142857142857|
## | Methodology| 27| Technical| 4644| 172.0|
## +---------------+---------+----------+------+-----------------+
Guidance
dropna() are how = 'any' - drop any row with any nulls. As such, you can just do df.dropna(), to drop the required record!When data has completely identical records, .dropDuplicates() can be used to leave only one copy of each record.
From this:
Item_Price_DataFrame.show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
To this:
Item_Price_DataFrame.dropDuplicates().show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |White Table| 3| 500| 350| 50|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## | Floor Lamp| 16| 60| 50| 1|
## |Black Chair| 22| 100| 70| 10|
## +-----------+----+-----+---------------+--------+
.dropDuplicates, or the similar .distinct() can be used to find the count of distinct records in one column.
print(Item_Price_DataFrame.select('Item').distinct().count())
## 4
What is the count of records in the DataFrame you created in 4.5 Unioning DataFrames - unioned_employees? What should the count be after dropping duplicates?
Run .count() on the DataFrame before and after dropping duplicates, and compare the results to your expectations.
There should be 8 records in the unioned dataframe. After dropping duplicates, there should only be 4.
pre_count = unioned_employees.count()
dropped_employees = unioned_employees.dropDuplicates()
post_count = dropped_employees.count()
dropped_employees.show()
## +---------------+---------+
## | department|employees|
## +---------------+---------+
## | Head Office| 5|
## |Applied Science| 49|
## | Legal| 11|
## | Methodology| 27|
## +---------------+---------+
print('before:', pre_count, ' after:', post_count)
## before: 8 after: 4
Aggregating records together allows us to utilise data at a higher level. groupby() can take any number of columns, and will group the data for each unique subset of those columns.
Count how many different items have the same price
Item_Price_DataFrame.groupBy("Price").count().show()
## +-----+-----+
## |Price|count|
## +-----+-----+
## | 100| 1|
## | 1000| 1|
## | 60| 1|
## | 500| 3|
## +-----+-----+
.agg() takes a dictionary of column names and methods of aggregating - sum, max, avg, std etc.
Item_Price_DataFrame.groupby('Item').agg({'Quantity':'sum'}).show()
## +-----------+-------------+
## | Item|sum(Quantity)|
## +-----------+-------------+
## |Black Chair| 10.0|
## | Couch| 5.0|
## | Floor Lamp| 1.0|
## |White Table| 90.0|
## +-----------+-------------+
.groupby() and .agg(). Use the original budget dataframe, rather than the one we have been applying transformations to.divisional_budgets = department_budget.groupby('division').agg({'budget':'sum'})
divisional_budgets.show()
## +-----------+------------+
## | division| sum(budget)|
## +-----------+------------+
## |Engineering| 45665|
## | Admin| 71289|
## | Scientific| 40068|
## | Technical| 43904|
## +-----------+------------+
Summary overviews of columns can be found with .describe(). It can be run with no variables to display details on all columns, or be passed a single column name, or a list of columns to describe only a few columns.
This returns the count of non-null values in each column, so it can be used to find columns which contain nulls by comparing the counts to the overall count of the DataFrame.
Item_Price_DataFrame.describe().show()
## +-------+-----------+-----------------+------------------+-----------------+------------------+
## |summary| Item| Code| Price| Last_Year_Price| Quantity|
## +-------+-----------+-----------------+------------------+-----------------+------------------+
## | count| 6| 6| 6| 6| 6|
## | mean| null|9.833333333333334| 443.3333333333333|394.6666666666667|17.666666666666668|
## | stddev| null|8.134289561274953|341.85767018843774| 317.279477222632|17.625738755203045|
## | min|Black Chair| 12| 100| 350| 1|
## | max|White Table| 3| 60| 900| 50|
## +-------+-----------+-----------------+------------------+-----------------+------------------+
Item_Price_DataFrame.describe('Code').show()
## +-------+-----------------+
## |summary| Code|
## +-------+-----------------+
## | count| 6|
## | mean|9.833333333333334|
## | stddev|8.134289561274953|
## | min| 12|
## | max| 3|
## +-------+-----------------+
Note:
As min/max can be performed on a set of strings and returns the first and last alphabetically, for the numerical columns Code and Price the results for min/max are the first and last alphabetically, but not numerically. Mean and standard deviation cannot be performed on strings, so the columns are implictly cast to numbers and the true results found.
department_budget.describe('budget').show()
## +-------+------------------+
## |summary| budget|
## +-------+------------------+
## | count| 33|
## | mean| 6088.666666666667|
## | stddev|2606.9864267323424|
## | min| 114|
## | max| 9724|
## +-------+------------------+
When reading data in from an HDFS table, the schema of the table should correctly set column types of the newly created DataFrame, to those of the original table. When reading in from a CSV, this is not true, and every column will be read in as a string, unless the optional parameter inferSchema = True is set.
Incorrect column types can cause strange issues to appear when handling the column as a whole, or when finding summaries of columns. Instead, .cast() can be used to convert columns from one data type, to another. The required type is imported first from pyspark.sql.types, then a column is overwritten with a version converted to a Double.
print(Item_Price_DataFrame.dtypes)
## [('Item', 'string'), ('Code', 'string'), ('Price', 'string'), ('Last_Year_Price', 'string'), ('Quantity', 'string')]
from pyspark.sql.types import DoubleType
Item_Price_Cast = Item_Price_DataFrame.withColumn('Code', Item_Price_DataFrame['Code'].cast(DoubleType()))
print(Item_Price_Cast.dtypes)
## [('Item', 'string'), ('Code', 'double'), ('Price', 'string'), ('Last_Year_Price', 'string'), ('Quantity', 'string')]
Item_Price_Cast.describe('Code').show()
## +-------+-----------------+
## |summary| Code|
## +-------+-----------------+
## | count| 6|
## | mean|9.833333333333334|
## | stddev|8.134289561274953|
## | min| 3.0|
## | max| 22.0|
## +-------+-----------------+
The data type is now set to a double rather than a string, the output values are the same (with a .0 as its been made a double), and describe correctly shows the right min and max for the Code column.
Note:
If your data is already a double, convert it into an integer!
from pyspark.sql.types import DoubleType
print(department_budget.dtypes)
## [('department', 'string'), ('division', 'string'), ('budget', 'int')]
department_budget_cast = department_budget.withColumn('budget', department_budget['budget'].cast(DoubleType()))
department_budget_cast.show(5)
## +-------------+---------+------+
## | department| division|budget|
## +-------------+---------+------+
## | HR| Admin|7852.0|
## | Finance| Admin|8541.0|
## | Legal| Admin|9656.0|
## |International| Admin|1913.0|
## | IT Services|Technical|7420.0|
## +-------------+---------+------+
## only showing top 5 rows
print(department_budget_cast.dtypes)
## [('department', 'string'), ('division', 'string'), ('budget', 'double')]
Guidance
.cast() the name of the datatype as a string:department_budget.withColumn('budget', department_budget['budget'].cast('double')).show(5)
## +-------------+---------+------+
## | department| division|budget|
## +-------------+---------+------+
## | HR| Admin|7852.0|
## | Finance| Admin|8541.0|
## | Legal| Admin|9656.0|
## |International| Admin|1913.0|
## | IT Services|Technical|7420.0|
## +-------------+---------+------+
## only showing top 5 rows
Now, you should be able to perform Data querying and manipulation. You should be able to:
Remove, Add, Rename a column
Filter on a condition of a column and on multiple columns
Replace values dependent on conditions
Check for nulls
Drop Duplicates
Aggregate Data
Do Summary Statistics
Intended Learning Outcomes:
By the end of this Chapter, you should be able to:
Use .repartition() and .coalesce() to obtain the optimal partitioning for your DataFrame and the cores you have available. You need to understand what the optimal partitioning is.
Save data in a json or csv format.
If you are using a relatively small set of data, partitioning can generally be left to Spark to handle. If however, you have a very, very large set of data, proper partitioning is the key difference between your analysis succeeding promptly, and your processes crashing as soon as it gets a glimpse of the data.
For large sets of data, the importance of partitioning cannot be understated. Within DataBricks Community Edition, you have access to 8 cores. Within the ONS Data Service, you can specifiy the number of cores you require. When thinking about partitioning, the primary aim should be to have our data equally distributed between the cores.
If we have 8 cores, and 1 million records, it stands to reason that we want 125,000 records on each core. Unbalanced partitioning can lead to massive slow down. We want the number of partitions to be a multiple of the number of cores, to ensure that every core is being used equally.
In the first diagram below, if we assume that each partition takes 10 seconds for a task to happen, the total time will be 20 seconds - 10 seconds for the first 8 partitions, and 10 seconds for the last 2. During the last 2, 6 cores are sitting idle.
In the second diagram, with half as many partitions, each twice as large as before. the processing will take 20 seconds, but 3 cores will sit idle for the entire time.
In the final diagram, with the data equally split into 8 partitions, each partition will take 12.5 seconds to process. As each core has only 1 partition, the overall processing time will be 12.5 seconds, with no idle cores.
In general, 200MB per partition is recommended. When considering whether to reduce partitions or increase partitions to reach a factor of the number of cores, consider the size of per partition and use that to decide.
If you are unaware of the size of your DataFrame, you can look at the origial data stored within HUE, which should be able to tell you the rough filesize. Or, you can cache the DataFrame, and use the Spark UI to find the size of the cached data.
To cache the data, do the following:
Item_Price_DataFrame.cache().count()
## 6
The count() action is needed to make the cache() transformation occur.
Then, navigate to the Spark UI, and go to the storage tab. You should be able to see the DataFrame you cached, as well as the number of partitions it is stored in, and the amount of memory in takes up. You can use this to decide on a more useful number of partitions.
The Spark UI can be found within databricks by clicking on the currently attached cluster button in a notebook, and selecting Spark UI. Within DAP, open a CDSW workbench session, create a SparkSession object, then click the icon in the top right of the workbench, with a three-by-three grid of rectangles on it, and select Spark UI from the dropdown menu.
.repartition()repartition(n) will return a new DataFrame with n partitions. It can both increase, and decrease, the number of partitions.
dataframe.repartition(8) for example will convert dataframes unknown number of partitions, with a relatively uniform distribution of data across them.
.repartition(n, cols) will repartition a DataFrame, into n partitions, but also keep records with the same values in cols the on the same partition. For example, if you have a Year column, all records with the same Year value will be placed on the same partition. With a bit of thought about how to repartition, by thinking about future transformations being performed on the data, efficiency can be had further down any processing pipeline by partitioning well at an early stage.
.coalesce()coalesce(n) will return a DataFrame with n, or the current amount of, partitions, whichever is lower. As such, it can only be used to reduce the number of partitions.
It cannot guarantee an even distribution of records across all partitions, as it does not shuffle data, only split it. However using repartition to reduce the number of partitions will result in a more even distribution of data, at the cost of an expensive shuffle operation - shuffling data between partitions.
.rdd MethodsThere are some especially useful RDD functions to find the number of partitions: df.rdd.getNumPartitions() and get a breakdown of the number of records of each parition using: df.rdd.glom().map(len).collect()
print(Item_Price_DataFrame.rdd.getNumPartitions())
## 1
df.rdd.glom().map(len).collect() is less obvious. .glom() is used to treat entire partitions as arrays. Any function applied to a glommed RDD will apply partition-by-partition, so when we map the len (length) of each partition, and collect the result, we get the length of each partition returned as a list.
Item_Price_DataFrame_3Part = Item_Price_DataFrame.repartition(3)
print(Item_Price_DataFrame_3Part.rdd.glom().map(len).collect())
## [2, 2, 2]
Calling .write.csv(filepath, sep, mode) will write a DataFrame to the specified location filepath using sep as to separate columns. Setting mode to 'overwrite' allows pyspark to write multiple times, discarding the existing version each time.
By default this will write to as many CSV files as you have partitions. This can be avoided by coalescing the data into one partition first, then writing to CSV. Be warned - your data will all be brought to the same partition, so should be smaller than the size of the partition.
The filepath listed will be a directory, inside of which will be the actual data in as many CSV files as you have partitions, each file starting with the word ‘part’, and 3 meta-data files, stating whether the write process has been successful, when it began, and when it was last modified.
However, Spark is intelligent enough that read.csv() can take the directory path only, read in every part file within it, and return the entire collection of data as a partioned DataFrame. This is useful if you want to store a DataFrame temporarily for reuse within Pyspark, but having data split into multiple files is less useful for external use.
Item_Price_DataFrame.write.csv('/folder/Item_Price_csv', sep = ',', mode = 'overwrite') #writes to the same number of csv files as partitions
Item_Price_DataFrame.coalesce(1).write.csv('Item_Price_Coalesced', sep = ',', mode = 'overwrite') #coalesces to one partition, writes to one csv.
intro_pyspark. Ensure it is on one partition first.Databricks has no convenient way of viewing data that is not stored in a table. Instead, all files can be viewed by pressing the Data icon on the left bar, going to Add Data as you did to upload files previously, but then click on the DBFS tab rather than remaining on the default Upload File tab. From here, all the files can be found. Uploaded CSV files will be in FileStore/tables/filename.csv, while files you have saved will be at the top level - or follow any directory structure you give it. The below answers for example, will store the results in a folder called intro_pyspark
department_employees.coalesce(1).write.csv('intro_pyspark/department_employees_csv', mode = 'overwrite')
Compare the results of this to if you do:
department_employees.write.csv('intro_pyspark/department_employees_8_csv', mode = 'overwrite')
Guidance
_). These mark the start of the write process (_started_), the last change to the write process (‘committed’), and the overall status, (hopefully ’_SUCCESS’).part files, with incrementing ids. These represent single partitions of the original DataFrame.part file.Calling .write.save(file, format) on a DataFrame will save to the specified location in the specified datatype. The example below will write only a selection of the columns from the DataFrame to the file location. As with CSVs, coalescing to 1 partition will ensure you have one output json, rather than a number equal to the number of partitions
Item_Price_DataFrame.coalesce(1).write.save("Item_Price_json",format="json")
There are many different ways of writing data out to a table, with a huge amount of parameters which can be played with. Different teams will have different preferred ways of writing data out, some of which are shown below.
To write using raw SQL commands, the destination table must first exist, and then can be written into.
sql_create_table = 'CREATE TABLE IF NOT EXISTS item_price_data(Item varchar(255), Code varchar(255), Price varchar(255), LastYearPrice varchar(255), Quantity varchar(255))'
spark.sql(sql_create_table)
Once an empty table has been created, it can be populated by creating a temporary view of your data using .createOrReplaceTempView('tablename'), and then writing that to the table.
Item_Price_DataFrame.createOrReplaceTempView('tempTableView')
sql_write_data = 'INSERT OVERWRITE TABLE item_price_data SELECT * FROM tempTableView'
spark.sql(sql_write_data)
INSERT OVERWRITE from the SQL above will replace any existing data with the new data.
A second way of writing to tables is using a similar format as to write as a CSV, using df.write.format().mode().saveAsTable().
Within the format(), pass in a string referring to how the underlying data should be formatted (e.g. parquet, avro, json), within mode() pass in a string referring to how the new data should be inserted (e.g. append, overwrite), and within the saveAsTable(), pass in the name of the desired table.
Item_Price_DataFrame.write.format('parquet').mode('overwrite').saveAsTable('item_price_data') #Overwrite existing data. Schema does not need to match, new data schema will overwrite existing.
Note:
Parquet: Optimised for efficient queries, data parsing.
Avro: Good for large binary data, or where data users want access to entire single records at once.
JSON: Good for when data is distributed across a lot of small files, aggregate data.
Method 1 - SQL:
create_table = 'CREATE TABLE IF NOT EXISTS department_employees_sql(department VARCHAR(255), employees INTEGER)'
store_data = 'INSERT OVERWRITE TABLE department_employees_sql SELECT * FROM temp_table_view'
spark.sql(create_table)
department_employees.createOrReplaceTempView('temp_table_view')
spark.sql(store_data)
Guidance
Method 2 - Python:
# overwrite existing data, store in table 'department_employees'
department_employees.write.format('parquet').mode('overwrite').saveAsTable('department_employees')
A written HDFS table should preserve the number of partitions in the data being written to it - repartitioning before writing should set the number of partitions and distribution of data, and preserve that in the HDFS table. Reading the data back in should see the same distribution, but empty partitions might be lost as seen in the diagram below, which shows some of the vagaries of repartitioning.
Appending data to an existing partitioned HDFS table might however not work exactly as expected, partitions might not hold the exact records expected when the new data is combined with the existing table data. To get around this, the original data could be read in, unioned in pyspark with the new data, repartitioned to force partitioning, and the original overwritten.
Now, you should be able to:
Use .repartition() and .coalesce() to obtain the optimal partitioning for your DataFrame and the cores you have available. You need to understand what the optimal partitioning is.
Save data in a json or csv format.
Intended Learning Outcomes:
By the end of this Chapter, you should be able to:
Take a sample from the Data
Convert Spark DataFrames into pandas DataFrame
Make a simple scatterplot in Python.
Remember, we are using Spark because the data is really big. To visualize the data we need to take a sample of the data and turn it to pandas Dataframe. The motivation behind that is that when you turn it to a pandas Dataframe all the data will return to one core and it is too big to allow for that. Thus, we need to take a sample of the Data, turn it to a pandas and then work normally like we would in the traditional Python.
The Item_Price_DataFrame is shown below:
Item_Price_DataFrame.show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## |White Table| 3| 500| 350| 50|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## | Couch| 12| 1000| 900| 5|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
.sample(withReplacement, fraction, seed=None): returns a sampled subset of this DataFrame, but it does not guarantee to provide exactly the fraction specified of the total count of the given DataFrame.
withReplacement : if it is True then a value can be used more than once, it is False then one value can only be used once.
fraction: sampling fraction of the data
seed : random seed
Item_Price_DataFrame_Sample = Item_Price_DataFrame.sample(withReplacement = False, fraction = 0.5, seed = 1)
Item_Price_DataFrame_Sample.show()
## +-----------+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair| 22| 100| 70| 10|
## | Floor Lamp| 16| 60| 50| 1|
## |White Table| 3| 500| 499| 20|
## +-----------+----+-----+---------------+--------+
.toPandas() : returns the contents of the DataFrame as Pandas DataFrame. It is only available if Pandas is installed and available.
Item_Price_Pandas_DataFrame = Item_Price_DataFrame_Sample.toPandas()
print(Item_Price_Pandas_DataFrame)
## Item Code Price Last_Year_Price Quantity
## 0 Black Chair 22 100 70 10
## 1 Floor Lamp 16 60 50 1
## 2 White Table 3 500 499 20
print(df_pd) or df_pd.head().sample_budget = department_budget.sample(False, 0.5, 1)
budget_pandas = sample_budget.toPandas()
print(budget_pandas)
## department division budget
## 0 HR Admin 7852
## 1 Legal Admin 9656
## 2 International Admin 1913
## 3 R&D Scientific 8389
## 4 Systems Engineering Engineering 6564
## 5 Transit Engineering 7069
## 6 Autonomy Engineering 4060
## 7 Air Engineering 8712
## 8 Maritime Engineering 8480
## 9 Customer Support Admin 8293
## 10 Executive Admin 6967
## 11 Data Science Scientific 7603
## 12 Radio Communications Scientific 3370
## 13 Economic Statistics Technical 9629
## 14 Marketing Admin 4989
## 15 Survey Technical 4665
## 16 Deep Learning Scientific 8420
## 17 Graphics Technical 9724
Different methods are needed in DAP and Databricks.
import matplotlib.pyplot as plt
plt.scatter(Item_Price_Pandas_DataFrame['Price'], Item_Price_Pandas_DataFrame['Last_Year_Price'])
plt.xlabel('Price')
plt.ylabel('Last Year Price')
plt.title('Price Vs Last Year Price')
plt.show()
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(Item_Price_Pandas_DataFrame['Price'], Item_Price_Pandas_DataFrame['Last_Year_Price'], 'ko')
display(fig)
Now, you should be able to:
Take a sample from the Data
Convert Spark DataFrames into pandas DataFrame
Make a simple scatterplot in Python.
Intended Learning Outcomes:
By the end of this Chapter, you should understand:
The the ONS Data Service contains multiple interconnected systems for the storage and manipulation of data, of which Cloudera Data Science Workshop (CDSW) forms a core. CDSW acts as an Integrated Development Environment (IDE) for Python, R and Scala development, and connects to a required amount of executors for distributed processing.
Data can be found within HUE (Hadoop User Experience) which acts as an interface for the data stored within the ONS Data Service, from CSV files to database tables.
When a session is started within the workbench, an engine profile is specified, determining the number of vCPUs and memory of the driver. As the amount of resources are finite, it is recommended to use the smallest profile that will fulfill your requirements. From within the users code, a SparkSession can be created, with the number of executors, and the size of executors, specified. This can then be used to read, process, and export, data from within HUE.
To fully utilise the capabilities of distributed programming, it is generally preferable to use a large number of smaller executors, rather than a smaller number of large ones.
An example SparkSession can be found below, with a dynamic number of executors, each containing 4 cores with 4gb of memory.
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName('project_name')
.config('spark.executor.memory', '4g')
.config('spark.executor.cores', 4)
.config('spark.shuffle.service.enabled', 'true')
.config('spark.dynamicAllocation.enabled', 'true')
.config('spark.dynamicAllocation.maxExecutors', 8)
.enableHiveSupport()
.getOrCreate()
)
.appName() allows for a name to be defined for the SparkSession to differentiate it from others.
spark.executor.memory is a config setting which defines the amount of memory per executor process.
spark.executor.cores defines the number of cores available for the executor process.
.enableHiveSupport() allows connectivity to Hive’s store of metadata - Hive is a database within HUE.
.getOrCreate() will either create the specified SparkSession, or get an existing SparkSession.
Full details of config settings can be found here: http://spark.apache.org/docs/latest/configuration.html.
As mentioned above in the data ingest section, pathing for files is important within the ONS Data Service From sql tables, a simple databse_name.table_name will do, while for CSV or JSON files, the full path below is recommended.
data_from_HUE_table = spark.sql('SELECT * FROM database_name.table_name')
data_from_CSV_file = spark.read.csv('hdfs://prod1/dapsen/landing_zone/ons/mbs/mbs_120219.csv', header=True)
Now, you should understand:
Intended Learning Outcomes:
By the end of this Chapter you should:
Feel confident to deal with Data using pyspark.
Be able to combine what we learnt in the previous Chapters to investigate and manipulate Data.
Be able to investigate the current partitioning of your DataFrame, calculate the optimal partition based on your DataFrame and your resources and make the necessary changes to achieve that.
You have been given two subsets of a huge dataset that contain the Travel to Work Areas for the United Kingdom. You have been assigned with the task to provide some basic descriptive analysis for the Travel to Work Areas, incuding the number of Lower Super Output Areas within the Travel to Work Area.
The main aim is to find the number of Small Areas within each Travel to Work Area.
You may need to consider the following:
What the two subsets are and how to join them.
What kind of join you want to do and why.
Check for any duplicates and/or missing values. Can any missing values be imputed?
Note:
Any errors, duplicated data, or missing values have been added as part of this course and do not reflect the state of the original data.
Travel_to_Work_Areas: Defined by an area in which the working population is over 3500, and over 75% of those who work in the area live in the area, and over 75% of those who live in the area work in the area.
Small_Area_Code: Lower Layer Super Output Area (LSOA) for England and Wales, Data Zone for Scotland, Super Output Area for Northern Ireland
Small_Area_Name: Lower Layer Super Output Area (LSOA) for England and Wales, Data Zone for Scotland, Super Output Area for Northern Ireland
SOAs are a geography hierarchy designed to improve the reporting of small-area statistics. In England and Wales Lower Layer SOAs (LSOA) with a minimum population of 1,000 and Middle Layer SOAs (MSOA) with a minimum population of 5,000 were introduced in 2004. LSOAs are of consistent size across the country and won’t be subject to regular boundary change. A decision was made not to create an Upper Layer in England, while in Wales an Upper Layer (USOA) was created. In Northern Ireland there is a single layer of SOAs, with a minimum population of 1,300. The Scottish equivalents of SOAs are Data Zones (DZ) with a minimum population of 500 and Intermediate Zones (IZ) with a minimum population of 2,500.
Area_Number: 1 to 228
Area_Code: Travel-To-Work-Area Code
Area_Name: Travel-To-Work-Area Name
Note:
By reading in and showing the data, we can get a feel for the data that is held in the files.
ttw_areas = spark.sql('SELECT * FROM travel_to_work_areas)
ttw_areas.show(10)
## +---------------+-----------+---------+---------+
## |Small_Area_Code|Area_Number|Area_Code|Area_Name|
## +---------------+-----------+---------+---------+
## | 95AA01S1| 18|N12000002| Belfast|
## | 95AA01S2| 18|N12000002| Belfast|
## | 95AA01S3| 18|N12000002| null|
## | 95AA02W1| 18|N12000002| Belfast|
## | 95AA03W1| 18|N12000002| null|
## | 95AA04W1| 18|N12000002| Belfast|
## | 95AA05W1| 9|N12000001|Ballymena|
## | 95AA06S1| 18|N12000002| Belfast|
## | 95AA06S2| 18|N12000002| Belfast|
## | 95AA07W1| 9|N12000001|Ballymena|
## +---------------+-----------+---------+---------+
## only showing top 10 rows
print(ttw_areas.dtypes)
## [('Small_Area_Code', 'string'), ('Area_Number', 'int'), ('Area_Code', 'string'), ('Area_Name', 'string')]
ttw_small_areas = spark.sql('SELECT * FROM travel_to_work_small_areas)
ttw_small_areas.show(10)
## +---------------+----------------+
## |Small_Area_Code| Small_Area_Name|
## +---------------+----------------+
## | 95AA01S1| Aldergrove 1|
## | 95AA01S2| Aldergrove 2|
## | 95AA01S3| Aldergrove 3|
## | 95AA02W1| Balloo|
## | 95AA03W1| Ballycraigy|
## | 95AA04W1| Clady|
## | 95AA05W1| Cranfield|
## | 95AA06S1|Crumlin 1 Antrim|
## | 95AA06S2|Crumlin 2 Antrim|
## | 95AA07W1| Drumanaway|
## +---------------+----------------+
## only showing top 10 rows
print(ttw_small_areas.dtypes)
## [('Small_Area_Code', 'string'), ('Small_Area_Name', 'string')]
If there are duplicates in the data, we want to catch them early. Let’s count the data, drop the duplicates, count again. If the count is different, then duplicates existed in the original data, but have now been sorted out.
print(ttw_areas.count(), ttw_small_areas.count())
## 453 453
ttw_areas_temp = ttw_areas.dropDuplicates()
ttw_small_areas_temp = ttw_small_areas.dropDuplicates()
print(ttw_areas_temp.count(), ttw_small_areas_temp.count())
## 452 452
As there were duplicates, we can overwrite our original data with the temp DataFrames with the duplicates removed.
ttw_areas = ttw_areas_temp
ttw_small_areas = ttw_small_areas_temp
There is one column common to both DataFrames, so we will join on that - Small_Area_Code. From inspection it looks like matching records are found in both DataFrames, so the type of join shouldn’t matter. However, we will do a left join keeping all data in travel_to_work_areas, as that data has some meaning without the small_area data, while the opposite is false.
ttw_joined = ttw_areas.join(ttw_small_areas, on = 'Small_Area_Code', how = 'left')
print(ttw_joined.count())
## 452
This also has the correct number of records, a good sign!
Lets look for nulls. .describe() will return the count of non-null values from that column. If that differs to the count found above, we know there are nulls in that column.
ttw_joined.describe().show()
## +-------+---------------+------------------+---------+-------------------+---------------+
## |summary|Small_Area_Code| Area_Number|Area_Code| Area_Name|Small_Area_Name|
## +-------+---------------+------------------+---------+-------------------+---------------+
## | count| 452| 452| 451| 448| 452|
## | mean| null| 33.84070796460177| null| null| null|
## | stddev| null|29.214040414996887| null| null| null|
## | min| 95AA01S1| 9|N12000001| Ballymena| Abbey Park|
## | max| 95LL26S2| 152|N12000009|Newry and Banbridge| Wynchurch|
## +-------+---------------+------------------+---------+-------------------+---------------+
We can see that there are nulls in two columns - Area_Code and Area_Name, with 1 and 4 nulls respectively. Lets have a look at these results:
ttw_joined.where((ttw_joined['Area_Code'].isNull() | ttw_joined['Area_Name'].isNull())).show()
## +---------------+-----------+---------+---------+----------------+
## |Small_Area_Code|Area_Number|Area_Code|Area_Name| Small_Area_Name|
## +---------------+-----------+---------+---------+----------------+
## | 95CC09S2| 63|N12000005| null| Hamiltonsbawn 2|
## | 95AA01S3| 18|N12000002| null| Aldergrove 3|
## | 95AA10W1| 18| null| Belfast|Greystone Antrim|
## | 95CC07W1| 63|N12000005| null| Derrynoose|
## | 95AA03W1| 18|N12000002| null| Ballycraigy|
## +---------------+-----------+---------+---------+----------------+
Can we use any data from other columns to impute our missing values?
When we showed the first 10 records above, it looks like there is a relationship between Area_Code and Area_Name - hopefully a one-to-one relationship. Lets take a look, and do a few aggregates to see.
ttw_joined.where(ttw_joined['Area_Name'] == 'Belfast').groupby('Area_Code').count().show()
## +---------+-----+
## |Area_Code|count|
## +---------+-----+
## | null| 1|
## |N12000002| 268|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000002').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## | Belfast| 268|
## | null| 2|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000005').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## |Craigavon| 67|
## | null| 2|
## +---------+-----+
This shows us that we should be able to impute the missing data. If in doubt, ask a subject matter expert who knows the data well. The filtered data above shows that every Belfast record has an Area_Code of N12000002, apart from the nulls, and the same for the two codes and their respective Area_Names.
Now we know what to fill our nulls with, we can do so. There are many different ways of doing this. A mapper could be created showing the one-to-one relationship between Area_Name and Area_Code, joined onto the original data, and the new columns combined with the old. In this case, because we have so few values to impute, .when().otherwise() can be used 3 times to fix the missing values.
from pyspark.sql.functions import when
ttw_joined = ttw_joined.withColumn('Area_Code', when(ttw_joined['Area_Name'] == 'Belfast', 'N12000002').otherwise(ttw_joined['Area_Code']))
ttw_joined = ttw_joined.withColumn('Area_Name', when(ttw_joined['Area_Code'] == 'N12000002', 'Belfast').otherwise(ttw_joined['Area_Name']))
ttw_joined = ttw_joined.withColumn('Area_Name', when(ttw_joined['Area_Code'] == 'N12000005', 'Craigavon').otherwise(ttw_joined['Area_Name']))
Let us check that this has fixed the null values.
ttw_joined.where(ttw_joined['Area_Name'] == 'Belfast').groupby('Area_Code').count().show()
## +---------+-----+
## |Area_Code|count|
## +---------+-----+
## |N12000002| 271|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000002').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## | Belfast| 271|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000005').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## |Craigavon| 69|
## +---------+-----+
There are no nulls, and the counts are as expected. One last describe will check the overall column status.
ttw_joined.describe().show()
## +-------+---------------+------------------+---------+-------------------+---------------+
## |summary|Small_Area_Code| Area_Number|Area_Code| Area_Name|Small_Area_Name|
## +-------+---------------+------------------+---------+-------------------+---------------+
## | count| 452| 452| 452| 452| 452|
## | mean| null| 33.84070796460177| null| null| null|
## | stddev| null|29.214040414996887| null| null| null|
## | min| 95AA01S1| 9|N12000001| Ballymena| Abbey Park|
## | max| 95LL26S2| 152|N12000009|Newry and Banbridge| Wynchurch|
## +-------+---------------+------------------+---------+-------------------+---------------+
The original question, beyond all the cleaning, is to display the counts of each small area within each travel to work area. This can be done by a simple groupby.
ttw_joined.groupby('Area_Name').count().show()
## +--------------------+-----+
## | Area_Name|count|
## +--------------------+-----+
## | Ballymena| 37|
## | Craigavon| 69|
## | Belfast| 271|
## | Dungannon| 2|
## | Coleraine| 43|
## | Newry and Banbridge| 14|
## |Cookstown and Mag...| 16|
## +--------------------+-----+
Given the dataset, investigate if using the current partitioning of DataFrames is efficient and we are not wasting resources. Justify and perform any action, as needed.
What happens if you coalesce to 8 partitions, or repartition to 2000? How long do simple actions take if the data has been transformed such.
Hint:
One of the main reasons to use RDDs, is when you need finer control of your data. As each dataframe is built on RDDs, you can access all RDD methods purely by calling df.rdd.method_name.
There are some especially useful RDD functions to find the number of partitions: df.rdd.getNumPartitions() and get a breakdown of the number of records of each parition using: df.rdd.glom().map(len).collect()
Databricks cells state at the bottom how long they take to run. As such, a command in a new cell can be run - such as .count(), and the time easily observed. If we change the number of partitions to extreme values and run these commands, we can get a feel for how the number of partitions should be tailored to the data.
Let us do a few different things. We’ll try raising the number of partitions to a very high number, lowering it to a very small number, and keeping it to the default. For each variant, we’ll run a count, to see how long it takes. Assuming the original data is not very large, the obvious hypothesis is that for a very large number of partitions, the process will slow down, as the system will have to deal with the overhead of each of those partitions, while they wont hold much data.
Working from the joined travel to work data - ttw_joined, we know we have 452 records. Lets use an RDD method to find the number of partitions.
print(ttw_joined.rdd.getNumPartitions())
## 200
200 partitions is probably not a good number of partitions to split 452 records into - this is about 2 records per partition!
Lets run a simple action in a new cell - such as a count() and see how long it takes to run.
print(ttw_joined.count())
## 452
For me, this took 1.02 seconds.
Let us also find the lengths of each partition, and some simple statistics, the minimum and maximum number of records per partitions, the average number, and the standard deviation of these values.
import numpy as np
l = ttw_joined.rdd.glom().map(len).collect()
print(l)
## [2, 3, 2, 2, 0, 4, 4, 2, 3, 2, 0, 2, 3, 3, 3, 2, 2, 2, 3, 2, 1, 3, 0, 2, 4, 2, 2, 2, 4, 2, 0, 3, 1, 2, 2, 3, 3, 4, 1, 3, 0, 3, 2, 2, 3, 3, 5, 3, 1, 2, 1, 1, 3, 2, 4, 5, 4, 1, 3, 1, 2, 2, 1, 2, 3, 1, 3, 4, 0, 1, 2, 3, 2, 2, 5, 2, 0, 1, 3, 1, 4, 3, 2, 5, 3, 2, 1, 1, 1, 2, 1, 1, 0, 1, 3, 1, 2, 3, 2, 0, 4, 1, 3, 2, 1, 1, 2, 2, 1, 2, 1, 2, 4, 1, 2, 0, 2, 0, 3, 2, 1, 3, 1, 3, 2, 5, 3, 5, 2, 1, 3, 1, 0, 0, 3, 4, 2, 3, 2, 2, 7, 2, 2, 1, 2, 4, 2, 2, 1, 2, 3, 1, 0, 3, 1, 4, 0, 4, 2, 6, 6, 3, 5, 2, 4, 2, 4, 3, 3, 1, 0, 2, 3, 2, 2, 0, 4, 3, 5, 4, 5, 1, 1, 2, 2, 1, 3, 2, 4, 3, 3, 1, 2, 3, 1, 1, 3, 4, 2, 2]
print(min(l), max(l), sum(l)/len(l), np.std(l))
## 0 7 2.26 1.3462540622037136
Some partitions are empty, while others have 7 records. Clearly the ideal is ~2.5 records per partition - not numbers in the range 0 to 7!
We can use another inbuilt method - a Counter, to find out how many partitions there are with each of 0 to 7 records on them:
from collections import Counter
print(Counter(l))
## Counter({2: 64, 3: 45, 1: 41, 4: 21, 0: 17, 5: 9, 6: 2, 7: 1})
Let us bring the number of partitions down to 8. To do this, we can use either coalesce(8), or repartition(8) and first take a look at the new partition structure.
ttw_8_part = ttw_joined.coalesce(8)
l = ttw_8_part.rdd.glom().map(len).collect()
print(min(l), max(l), sum(l)/len(l), np.std(l))
## 45 66 56.5 6.96419413859206
print(Counter(l))
## Counter({58: 2, 56: 1, 45: 1, 46: 1, 61: 1, 66: 1, 62: 1})
And again, in a new cell, run a count.
print(ttw_8_part.count())
## 452
For me, this took 0.17 seconds, less than a fifth of the original time! We also have no empty partitions, and while not perfect, the partitioning isn’t terrible. With a longer process with multiple transformations, or a much larger data set, this will really add up.
Now, lets raise the number up, this time to 2000 partitions. Immediately this seems like a bad idea - with only 452 records there will be at least 1548 empty partitions! As we want to increase the number of partitions, we need to use repartition(2000).
ttw_2000_part = ttw_joined.repartition(2000)
l = ttw_2000_part.rdd.glom().map(len).collect()
print(min(l), max(l), sum(l)/len(l), np.std(l))
## 0 2 0.226 0.42181038394046205
print(Counter(l))
## Counter({0: 1551, 1: 446, 2: 3})
Clearly this data is not well distributed. The vast majority of partitions are empty, while the overwhelming majority of non-empty partitions have but a single record within them.
Finally, in a new cell, we run a count.
print(ttw_2000_part.count())
## 452
For me this took 3.24 seconds. Three times longer than the original, and 16 times longer than the more efficient 8 partitions.
Now, you should:
Feel confident to deal with Data using pyspark.
Be able to combine what we learnt in the previous Chapters to investigate and manipulate Data.
Be able to investigate the current partitioning your DataFrame, calculate the optimal partition based on your DataFrame and your resources and make the necessary changes to achieve that.
Pyspark documentation http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
Bill Chambers and Matei Zaharia, Spark- The Definite Guide, O’Reilly, 2018 https://drive.google.com/file/d/1pCtRc2oantUBBsDluEXQ9qQoewcuuzrZ/view
CheatSheet: https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf
Spark’s Machine learning library https://spark.apache.org/docs/2.3.1/api/python/pyspark.ml.html/
Graphs and graph-parallel computation https://spark.apache.org/graphx/
Spark Streaming https://spark.apache.org/streaming/
Using Scala in Spark
Datasets
Advanced Topics on Partitioning and Optimisation
Resilient Distributed Systems (RDDs)